package cc.iotkit.plugins.tcp.server;


import cc.iotkit.common.constant.Constants;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.plugin.core.IPluginScript;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
import cc.iotkit.plugin.core.thing.actions.up.PropertyReport;
import cc.iotkit.plugins.tcp.bean.DataUp;
import cc.iotkit.plugins.tcp.bean.ReportedData;
import cc.iotkit.plugins.tcp.cilent.VertxTcpClient;
import cc.iotkit.plugins.tcp.conf.TcpServerConfig;
import cc.iotkit.plugins.tcp.enums.CommandType;
import cc.iotkit.plugins.tcp.enums.DeviceType;
import cc.iotkit.plugins.tcp.parser.DeviceData;
import cc.iotkit.plugins.tcp.parser.EdgeComputingDataDecoder;
import cc.iotkit.plugins.tcp.parser.EdgeComputingDataPackage;
import cc.iotkit.script.IScriptEngine;
import cn.hutool.core.util.IdUtil;
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author huangwenl
 * @date 2022-10-13
 */
@Slf4j
@Service
public class TcpServerVerticle extends AbstractVerticle {

    @Getter
    @Setter
    private TcpServerConfig config;

    /**
     * 用于存储与服务器连接的TCP客户端实例。键是客户端地址，值是 VertxTcpClient 的实例。
     *
     */
    private final Map<String, VertxTcpClient> clientMap = new ConcurrentHashMap<>();

    private final Map<String, String> dnToPk = new HashMap<>();

    private final Map<String, Long> heartbeatDevice = new HashMap<>();//用于跟踪设备的最后心跳时间。这用于检测设备是否仍然在线。

    private final Map<String, Set> gatewayAccessoryEquipment = new HashMap<>();

    @Setter
    private long keepAliveTimeout = Duration.ofSeconds(30).toMillis();

    private NetServer netServer; //netServer 是Vert.x NetServer 的实例，它用于接受和处理TCP连接。

    @Getter
    private IScriptEngine scriptEngine;//是一个用于执行自定义脚本的引擎，可能用于解码消息或执行其他自定义逻辑。

    @Autowired
    private PluginInfo pluginInfo;

    @Autowired
    @AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
    private IPluginScript pluginScript;

    @Autowired
    @AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
    private IThingService thingService;

    @Override
    public void stop() {
        if (netServer != null) {
            netServer.close(rst -> {
                log.info("tcp server close:{}", rst.succeeded());
            });
        }

        log.info("tcp server stopped");
    }

    /**
     * 创建配置文件
     */
    @PostConstruct
    public void initConfig() {
        log.info("initConfig:{}", pluginScript.getClass().getName());
        //获取脚本引擎
        scriptEngine = pluginScript.getScriptEngine(pluginInfo.getPluginId());
        if (scriptEngine == null) {
            throw new BizException("script engine is null");
        }
        Executors.newSingleThreadScheduledExecutor().schedule(this::initTcpServer, 3, TimeUnit.SECONDS);
    }


    /**
     * 初始TCP服务
     */
    private void initTcpServer() {
        netServer = vertx.createNetServer(
                new NetServerOptions().setHost(config.getHost())
                        .setPort(config.getPort()));
        netServer.connectHandler(this::acceptTcpConnection);
        netServer.listen(config.createSocketAddress(), result -> {
            if (result.succeeded()) {
                log.info("tcp server startup on {}", result.result().actualPort());
            } else {
                result.cause().printStackTrace();
            }
        });
    }

    public void sendMsg(String addr, Buffer msg) {
        VertxTcpClient tcpClient = clientMap.get(addr);
        if (tcpClient != null) {
            tcpClient.sendMessage(msg);
        }
    }

    @Scheduled(fixedRate = 40, timeUnit = TimeUnit.SECONDS)
    private void offlineCheckTask() {
        log.info("keepClientTask");
        Set<String> clients = new HashSet<>(clientMap.keySet());
        for (String key : clients) {
            VertxTcpClient client = clientMap.get(key);
            if (!client.isOnline()) {
                client.shutdown();
            }
        }

        heartbeatDevice.keySet().iterator().forEachRemaining(addr -> {
            Long time = heartbeatDevice.get(addr);
            //心跳超时，判定为离线
            if (System.currentTimeMillis() - time > keepAliveTimeout * 2) {
                heartbeatDevice.remove(addr);
                //排除门禁和摄像头
//                if (!Constants.DOOR_PK.equals(dnToPk.get(addr))
//                && !Constants.CAMERA_PK.equals(dnToPk.get(addr))) {
//                    //离线上报
//                    thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
//                            .id(IdUtil.simpleUUID())
//                            .productKey(dnToPk.get(addr))
//                            .deviceId(addr)
//                            .state(DeviceState.OFFLINE)
//                            .time(System.currentTimeMillis())
//                            .build());
//                }
            }
        });
    }

    /**
     * TCP连接处理逻辑
     *
     * @param socket socket
     */
    protected void acceptTcpConnection(NetSocket socket) {
        // 客户端连接处理
        //为连接生成一个唯一的客户端ID，包括一个UUID和客户端的远程地址。
        String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress();
        //创建一个 VertxTcpClient 实例来管理TCP客户端的状态，设置一个保活超时时间。
        VertxTcpClient client = new VertxTcpClient(clientId);
        client.setKeepAliveTimeoutMs(keepAliveTimeout);
        try {
            // 为socket设置异常处理器来打印异常堆栈，并设置一个关闭处理器来在连接关闭时记录信息并关闭客户端。
            socket.exceptionHandler(Throwable::printStackTrace).closeHandler(nil -> {
                log.debug("tcp server client [{}] closed", socket.remoteAddress());
                client.shutdown();
            });
            // 这个地方是在TCP服务初始化的时候设置的 parserSupplier
            client.setKeepAliveTimeoutMs(keepAliveTimeout);
            client.setSocket(socket);

           RecordParser parser1 =  RecordParser.newDelimited("}]}]}", buffer -> {
               log.info("进入RecordParser");
                // 将 Buffer 转换为字符串，然后解析 JSON
               EdgeComputingDataPackage dataPackage =  EdgeComputingDataDecoder.decode(buffer) ;
               if (dataPackage.getCode() == CommandType.CODE_DATA_UP.getCode()) {
                   //根据设备密钥--获取设备DN

                   Set<String> set = new HashSet();
                   List<? extends DeviceData> deviceDataList = dataPackage.getPayload().getDeviceDataList();
                   for(DeviceData deviceData:deviceDataList){
                       DataUp du =  (DataUp)deviceData;
                       String id = deviceData.getId();
                       dnToPk.put(id,DeviceType.getPkByCode(du.getCode()));
                   /*    set.add();*/
                       Map<String, Object> rst = new HashMap<>();
                       List<ReportedData> data = du.getData();
                       for (ReportedData rp:data){
                           String name = rp.getName();
                           String value = rp.getValue();
                           rst.put(name,value);
                       }
                       online(id);
                       thingService.post(pluginInfo.getPluginId(), PropertyReport.builder()
                               .id(IdUtil.simpleUUID())
                               .productKey(DeviceType.getPkByCode(du.getCode()))
                               .deviceId(du.getId())
                               .params(rst)
                               .time(System.currentTimeMillis())
                               .build());
                   }
               }else if (dataPackage.getCode() == CommandType.CODE_REGISTER.getCode()) {

               }
            });
            client.setParser(parser1);
            log.debug("accept tcp client [{}] connection", socket.remoteAddress());
        } catch (Exception e) {
            e.printStackTrace();
            client.shutdown();
        }
    }

    private void online(String addr) {
        if (!heartbeatDevice.containsKey(addr)) {
            //第一次心跳，上线
            thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
                    .id(IdUtil.simpleUUID())
                    .productKey(dnToPk.get(addr))
                    .deviceId(addr)
                    .state(DeviceState.ONLINE)
                    .time(System.currentTimeMillis())
                    .build());
        }
    }

    private void online(String deviceName,String pk) {
        if (!heartbeatDevice.containsKey(deviceName)) {
            //第一次心跳，上线
            thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
                    .id(IdUtil.simpleUUID())
                    .productKey(pk)
                    .deviceId(deviceName)
                    .state(DeviceState.ONLINE)
                    .time(System.currentTimeMillis())
                    .build());
        }
    }

}
